package com.dm.cloud.controller;

import com.alibaba.excel.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.dm.cloud.api.vo.ChatMessageVo;
import com.dm.cloud.dao.ChatCacheDao;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 聊天室功能
 * 消息类型
 * 1、用户上线
 * 2、用户下线
 * 3、普通消息
 * 4、异地登陆
 * 内容类型
 * 1、普通文本
 * 2、图片
 * 3、视频
 **/

/**
 * TODO
 * 1、网关转发异常处理
 * 2、权鉴异常处理(导致用户ID需要前端传递)
 */
//注册成组件
@Component
@ServerEndpoint(value = "/websocket/{senderId}")
@Log4j2
@EnableScheduling
public class ChatOnlineWebSocket {

    private static ChatCacheDao chatCacheDao;

    @Autowired
    public void setChatCacheDao(ChatCacheDao chatCacheDao) {
        this.chatCacheDao = chatCacheDao;
    }

    //实例一个session，这个session是websocket的session
    private Session session;

    private String currentId = "";
 
    //存放在线的用户数量
    private static AtomicInteger userNumber = new AtomicInteger(0);

    //已经发送的消息
    final transient Object lock = new Object(); //锁
    private static List<ChatMessageVo> messageList = new ArrayList<>();

    //存放websocket的集合
    private static CopyOnWriteArraySet<ChatOnlineWebSocket> webSocketSet = new CopyOnWriteArraySet<>();

    private void uploadCache(){
        List<ChatMessageVo> insertData = new ArrayList<>();
        synchronized (lock){
            for (ChatMessageVo chatMessageVo : messageList) {
                insertData.add(chatMessageVo.clone());
            }
            messageList.clear();
        }

        //上传逻辑
        if(insertData.size() > 0) {
            chatCacheDao.batchInsert(insertData);
        }
    }

    @Bean
    private void uploadData(){
        AtomicReference<LocalDateTime> datetime = new AtomicReference<>(LocalDateTime.now());
        new Thread(()->{
            while(true) {
                if (messageList.size() > 100) {
                    //消息超过100缓存
                    uploadCache();
                    //刷新缓存时间
                    datetime.set(LocalDateTime.now());
                } else {
                    Duration duration = Duration.between(datetime.get(), LocalDateTime.now());
                    long count = duration.toMillis();
                    if (count >= 10000) {
                        //消息数量没超过100 每十秒缓存一次
                        uploadCache();
                        //刷新缓存时间
                        datetime.set(LocalDateTime.now());
                    }
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception ex) {
                    log.error("TimeUnit.SECONDS.sleep ERROR");
                }
            }
        }).start();
    }

    @Scheduled(cron = "0/5 * * * * ?")
    private void configureTasks() throws Exception{
        if(session!=null && session.isOpen()) {
            session.getBasicRemote().sendText("链接成功");
        }
    }

    //前端请求时一个websocket时,从登陆信息中获取用户信息
    @OnOpen
    public void onOpen(Session session,@PathParam("senderId") String userId) throws IOException {

        this.session = session;

        //保存当前用户信息
//        this.getCurrentUser((user)->{
//            currentId = String.valueOf(user.getId());
//        });
//        用户ID暂时由前端传递
        currentId = userId;

        //获取已经登陆的用户信息 并 判断当前用户是不是已经登陆过了
        Set<String> userLists = new TreeSet<>();
        boolean hasLogin = false;
        for (ChatOnlineWebSocket webSocket : webSocketSet) {
            if (webSocket.currentId.equals(this.currentId)) {
                //表示已经登陆过了 通知之前的用户下线
                ChatMessageVo chatMessageVo = new ChatMessageVo();
                //主要是针对自己上线后要初始化 在线列表
                chatMessageVo.setMessageType(4);
                chatMessageVo.setSenderId(this.currentId);
                chatMessageVo.setMessage("异地登陆");
                //先给旧的登录地址发送推出信息
                sendMessageTo(JSON.toJSONString(chatMessageVo), this.currentId);
                //用户列表添加
                userLists.add(webSocket.currentId);
            } else {
                userLists.add(webSocket.currentId);
            }
        }
        if(!hasLogin){
            //增加在线人数
            userNumber.addAndGet(1);
            userLists.add(this.currentId);
        }
        //通知之后 将当前对象放入webSocketSet
        webSocketSet.add(this);
        //将所有信息包装好传到客户端(给所有用户)
        ChatMessageVo chatMessageVo = new ChatMessageVo();
        //主要是针对自己上线后要初始化 在线列表
        chatMessageVo.setUserLists(userLists);
        chatMessageVo.setMessageType(1);
        chatMessageVo.setSenderId(this.currentId);
        chatMessageVo.setMessage("用户上线");
        chatMessageVo.setNumber(this.userNumber.get());
        //发送给所有用户谁上线了，并让他们更新自己的用户菜单
        sendMessageAll(JSON.toJSONString(chatMessageVo));
        log.info("【websocket消息】有新的连接, 总数:{}", this.userNumber);
    }
 
    //前端关闭时一个websocket时
    @OnClose
    public void onClose() throws IOException {
        //从集合中移除当前对象
        webSocketSet.remove(this);
        //在线用户数减少
        userNumber.addAndGet(-1);

        //将所有信息包装好传到客户端(给所有用户)
        ChatMessageVo chatMessageVo = new ChatMessageVo();
        chatMessageVo.setMessageType(2);
        chatMessageVo.setMessage("用户下线");
        chatMessageVo.setSenderId(this.currentId);
        chatMessageVo.setNumber(this.userNumber.get());
        //发送用户下线信息 让哥用户更新在线列表
        sendMessageAll(JSON.toJSONString(chatMessageVo));
        log.info("【websocket消息】连接断开, 总数:{}", webSocketSet.size());
    }
 
    //前端向后端发送消息
    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("【websocket消息】收到客户端发来的消息:{}", message);
        //将前端传来的数据进行转型
        com.alibaba.fastjson.JSONObject jsonObject = JSON.parseObject(message);
        //消息文本
        String textMessage = jsonObject.getString("message");
        //消息文本
        String uuid = jsonObject.getString("uuid");

        String receiverId = null;
        //判断是不是私发

        try {
            receiverId = jsonObject.getString("receiverId");
        }finally {
            if(StringUtils.isEmpty(receiverId)){
                throw new RuntimeException("消息缺少接收对象！");
            }
        }

        ChatMessageVo chatMessageVo = new ChatMessageVo();
        chatMessageVo.setMessageType(3);
        chatMessageVo.setSenderId(this.currentId);

        chatMessageVo.setReceiverId(receiverId);
        chatMessageVo.setMessage(textMessage);
        chatMessageVo.setUuid(uuid);

        if(!receiverId.startsWith("group_")){
            //私发
            chatMessageVo.setReceiverId(receiverId);
            sendMessageTo(JSON.toJSONString(chatMessageVo),receiverId);
            //给自己也得发送一份 主要用于告知自己消息已经发送成功
            sendMessageTo(JSON.toJSONString(chatMessageVo),this.currentId);
        }else{
            //群发
            //目前就一个群 默认使用这个ID 所有人都在这个群里面
            chatMessageVo.setReceiverId(receiverId);
            sendMessageAll(JSON.toJSONString(chatMessageVo));
        }

        //先缓存记录消息
        synchronized (lock){
            messageList.add(chatMessageVo);
        }
    }
 
    /**
     *  消息发送所有人
     */
    public void sendMessageAll(String message) throws IOException {
        for (ChatOnlineWebSocket webSocket: webSocketSet) {
            //消息发送所有人（同步）getAsyncRemote
            webSocket.session.getBasicRemote().sendText(message);
        }
    }
 
    /**
     *  消息发送指定人
     */
    public void sendMessageTo(String message, String toUserId) throws IOException {
        //遍历所有用户
        for (ChatOnlineWebSocket webSocket : webSocketSet) {
            if (webSocket.currentId.equals(toUserId)) {
                //消息发送指定人
                webSocket.session.getBasicRemote().sendText(message);
                log.info("【发送消息】:", this.currentId+"向"+webSocket.currentId+"发送消息："+message);
                break;
            }
        }
    }

    /**
     * 获取当前用户后的操作
     * @param consumer
     */
//    private void getCurrentUser(Consumer<Users> consumer){
//        //调用auth服务
//        R<Users> user = authFeign.getCurrentUser(CloudConstant.AUTH_REQUEST_UUID);
//        if("success".equals(user.getType())
//                && CloudConstant.RESPONSE_SUCCESS_CODE == user.getCode()
//                && Optional.ofNullable(user.getResult()).isPresent()) {
//            Users userDto = user.getResult();
//            if(null != consumer){
//                consumer.accept(userDto);
//            }
//        }else{
//            throw new RuntimeException("用户信息获取失败，请重新登陆尝试");
//        }
//    }

}
 